package org.voovan.korla.rpc.service;

import org.voovan.korla.rpc.exception.RpcException;
import org.voovan.korla.rpc.service.selector.LoopSelector;
import org.voovan.korla.rpc.service.selector.NameSelector;
import org.voovan.tools.TEnv;
import org.voovan.tools.TObject;
import org.voovan.tools.log.Logger;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import org.voovan.korla.rpc.service.selector.RandomSelector;

/**
 * 为 RpcConsumer 服务自动生成的调用类进行标记
 * 在 ConsumeClass.template 中使用到
 *
 * @author: helyho
 * Korla Framework.
 * WebSite: https://github.com/helyho/Korla
 * Licence: Apache v2 License
 */
public class RpcService<T extends RpcService, K> {
    public BiFunction<K, RpcService, String> DEFAULT = new LoopSelector<K>();


    protected ThreadLocal<K> selectorNode = new ThreadLocal<>();

    private Map<String, RpcConsumer> container;
    private BiFunction<K, RpcService, String> selector = (BiFunction<K, RpcService, String>) DEFAULT;

    private int readTimeout = 5;
    private int sendTimeout = 5;
    private int minPoolSize = 1;
    private int maxPoolSize = 10;

    /**
     * 构造方法
     */
    public RpcService() {
        container = new ConcurrentHashMap<>();
    }

    /**
     * 构造方法
     * @param readTimeout 读超时时间
     * @param sendTimeout 写超时时间
     * @param minPoolSize 最小容量
     * @param maxPoolSize 最大容量
     */
    public RpcService(Integer readTimeout, Integer sendTimeout, Integer minPoolSize, Integer maxPoolSize) {
        this();
        this.readTimeout = readTimeout;
        this.sendTimeout = sendTimeout;
        this.minPoolSize = minPoolSize;
        this.maxPoolSize = maxPoolSize;
    }

    /**
     * 构造方法, 内部构造 RpcConsumer 对象
     * @param host Rpc 服务主机地址
     * @param port Rpc 服务主机端口
     * @param readTimeout 读超时时间
     * @param sendTimeout 写超时时间
     * @param minPoolSize 最小容量
     * @param maxPoolSize 最大容量
     */
    public RpcService(String host, Integer port, Integer readTimeout, Integer sendTimeout, Integer minPoolSize, Integer maxPoolSize) {
        this(readTimeout, sendTimeout, minPoolSize, maxPoolSize);
        addConsumer(host, port);

        //这种构造形式, 认为是单节点模式采用随机选择器
        selector = new RandomSelector<K>();
    }

    /**
     * 构造方法, 内部构造 RpcConsumer 对象
     * @param host Rpc 服务主机地址
     * @param port Rpc 服务主机端口
     */
    public RpcService(String host, Integer port) {
        this();
        addConsumer(host, port);

        //这种构造形式, 认为是单节点模式采用随机选择器
        selector = new RandomSelector<K>();
    }

    /**
     * 构造方法, 引用当前 Consumer 方法
     * @param consumer RpcConsumer 对象
     */
    public RpcService(RpcConsumer consumer) {
        this(consumer.getKorlaConsumerPool().getReadTimeout()/1000, consumer.getKorlaConsumerPool().getSendTimeout()/1000,
                consumer.getKorlaConsumerPool().getMinSize(), consumer.getKorlaConsumerPool().getMaxSize());
        addConsumer(consumer);

        //这种构造形式, 认为是单节点模式采用随机选择器
        selector = new RandomSelector<K>();
    }

    public int getReadTimeout() {
        return readTimeout;
    }

    public void setReadTimeout(int readTimeout) {
        this.readTimeout = readTimeout;
    }

    public int getSendTimeout() {
        return sendTimeout;
    }

    public void setSendTimeout(int sendTimeout) {
        this.sendTimeout = sendTimeout;
    }

    public int getMinPoolSize() {
        return minPoolSize;
    }

    public void setMinPoolSize(int minPoolSize) {
        this.minPoolSize = minPoolSize;
    }

    public int getMaxPoolSize() {
        return maxPoolSize;
    }

    public void setMaxPoolSize(int maxPoolSize) {
        this.maxPoolSize = maxPoolSize;
    }


    /**
     * 检查所有的节点状态
     * @param node node 名称
     * @param threshold 失败检查次数
     * @param interval 失败重复检查休眠时间
     * @return List&lt;node名称, status&gt;, status: true: 节点正常, false: 节点不存在或者节点被断开(自动移除断开节点)
     */
    public Map<String, Boolean> checkAll(int threshold, int interval){
        Map<String, Boolean> checkRet = new HashMap<String, Boolean>();
        for(String node : container.keySet()) {
            checkRet.put(node, check(node, threshold, interval));
        }

        return checkRet;
    }

    /**
     * 检查指定的节点状态
     * @param node node 名称
     * @param threshold 失败检查次数
     * @param interval 失败重复检查休眠时间
     * @return true: 节点正常, false: 节点不存在或者节点被断开(自动移除断开节点)
     */
    public boolean check(String node, int threshold, int interval){
        RpcConsumer consumer = container.get(node);

        if(consumer == null){
            return false;
        }else if(!consumer.getKorlaConsumerPool().isAvaliable()) {
            for(int i=0;i<threshold;i++) {
                if(consumer.getKorlaConsumerPool().isAvaliable()) {
                    return true;
                } else {
                    TEnv.sleep(interval);
                }
            }
            consumer.getKorlaConsumerPool().close();
            return false;
        } else {
            return true;
        }
    }

    /**
     * 添加一个 RpcConsumer
     * @param node           服务的 node 名称
     * @param consumer      RpcConsumer 对象
     * @return RpcGateway 对象
     */
    public synchronized RpcService<T, K> addConsumer(String node, RpcConsumer consumer) {
        node = node==null ? "Node-" + container.size() : node;

        RpcConsumer usingConsumer = container.get(node);
        if(usingConsumer!=null) {
            if (!usingConsumer.getKorlaConsumerPool().isAvaliable()) {
                usingConsumer.getKorlaConsumerPool().close();
                container.remove(node);
            } else {
                return this;
            }
        }

        if(container.putIfAbsent(node, consumer)!=null) {
            Logger.warnf("Create RpcService failed, node is exists {}" + node);
        }
        return this;
    }

    /**
     * 添加一个 RpcConsumer
     * @param consumer      RpcConsumer 对象
     * @return RpcGateway 对象
     */
    public RpcService<T, K> addConsumer(RpcConsumer consumer) {
        return addConsumer(null, consumer);
    }

    /**
     * 添加一个 RpcConsumer
     * @param rpcServiceClazz Rpc 服务的类
     * @param node 服务的 node 名称
     * @param host Rpc 服务主机地址
     * @param port Rpc 服务主机端口
     * @return RpcGateway 对象
     */
    public synchronized RpcService<T, K> addConsumer(String node, String host, int port) {

        node = node==null ? "Node-" + container.size() : node;

        RpcConsumer usingConsumer = container.get(node);
        if(usingConsumer!=null) {
            if (usingConsumer.getKorlaConsumerPool().isAvaliable()) {
                usingConsumer.getKorlaConsumerPool().close();
                container.remove(node);
            } else {
                return this;
            }
        }

        RpcConsumer rpcConsumer = RpcConsumer.newInstance(host, port, readTimeout * 1000, sendTimeout * 1000, minPoolSize, maxPoolSize);
        container.put(node, rpcConsumer);

        return this;
    }

    /**
     * 添加一个 RpcConsumer
     * @param host Rpc 服务主机地址
     * @param port Rpc 服务主机端口
     * @return RpcGateway 对象
     */
    public RpcService<T, K> addConsumer(String host, int port) {
        return addConsumer(null, host,port);
    }

    /**
     * 移除一个 RpcConsumer
     * @param node 服务的 node 名称
     * @return 被移除的 RPC 服务
     */
    public synchronized RpcConsumer removeConsumer(String node) {
        return container.remove(node);
    }

    /**
     * 获得所有的 RpcConsumer
     * @return Rpc 服务的 Map
     */
    public Map<String, RpcConsumer> getAllConsumer() {
        return container;
    }

    /**
     * 按指定的 node 判断 RpcConsumer 是否存在
     * @param node Rpc 服务的 node 名称
     * @return true: 存在, false: 不存在
     */
    public boolean exists(String node) {
        return container.containsKey(node);
    }

    /**
     * 按指定的 node 获取一个 RpcConsumer
     * @param node Rpc 服务的 node 名称
     * @return Rpc 服务对象
     */
    public RpcConsumer getByName(String node) {
        RpcConsumer rpcConsumer = container.get(node);
        return rpcConsumer;
    }

    /**
     * 获取网关的服务选择器
     * @return 网关的服务选择器
     */
    public BiFunction<K, RpcService, String> selector() {
        return selector;
    }

    /**
     * 设置一个网关的服务选择器
     * @param selector 网关的服务选择器 &lt;Rpc参数, Rpc对象, node 名称&gt;
     */
    public void selector(BiFunction<K, RpcService, String> selector) {
        this.selector = selector;
    }

    /**
     * 使用选择器,选一个 RpcConsumer
     * @param k node 名称
     * @return Rpc 服务对象
     */
    public RpcConsumer getBySelector(K k) {
        return getByName(selector.apply(k, this));
    }

    /**
     * 设置用于服务选择器的 node 的 key
     * @param k 用于服务选择的 node 的 key
     * @return RpcService 对象
     */
    public T node(K k) {
        selectorNode.set(k);
        return (T)this;
    }
}
